package cn.gbase.rxdata;
import javax.sql.DataSource;
import java.io.Closeable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static org.joor.Reflect.*;

public class DataStreamTemplate {
    private DataSource dataSource;

    public DataStreamTemplate(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public Stream<Map> query(String sql) throws SQLException {
        return new StreamQuery().stream(sql);
    }

    public <T> Stream<T> query(String sql, Class  clazz) throws SQLException, InstantiationException, IllegalAccessException {
        return new StreamQuery().stream(sql,clazz);
    }

    class StreamQuery implements Closeable {

        private Connection connection;
        private Statement statement;
        private long dataIndex = 0;

        public Stream<Map> stream(String sql) throws SQLException {
            connection = dataSource.getConnection();

            statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            statement.setFetchSize(Integer.MIN_VALUE);

            ResultSet rsa = statement.executeQuery(sql);
            Observer ob = new Observer();
            ResultSet rs = ResultSetDynamicProxyHandler.newInstanceWithObserver(rsa,ob);
            int columns = rs.getMetaData().getColumnCount();
            Map resultMap = new HashMap(columns);


            Stream<Map> resultStream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Map>(Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE) {
                @Override
                public boolean tryAdvance(Consumer<? super Map> action) {
                    try {
                        if (!rs.next()) {
                            return false;
                        }
                        resultMap.clear();
                        for (int i = 1; i <= columns; i++) {
                            resultMap.put(rs.getMetaData().getColumnLabel(i), rs.getObject(i));
                        }
                        action.accept(resultMap);
                        return true;
                    } catch (SQLException e) {
                        throw new RuntimeException(e);
                    }
                }
            }, false).onClose(() -> close());
            return resultStream;
        }


        public <T> Stream<T> stream(String sql,Class clazz) throws SQLException, IllegalAccessException, InstantiationException {
            dataIndex = 0;
            connection = dataSource.getConnection();

            statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            statement.setFetchSize(Integer.MIN_VALUE);

            ResultSet rsa = statement.executeQuery(sql);
            Observer ob = new Observer();
            ResultSet rs = ResultSetDynamicProxyHandler.newInstanceWithObserver(rsa,ob);
            int columns = rs.getMetaData().getColumnCount();



            Stream<T> resultStream = StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE) {
                @Override
                public boolean tryAdvance(Consumer<? super T> action) {
                    try {
                        if (!rs.next()) {
                            return false;
                        }

                        ++dataIndex;
                        System.out.println("clazz => "+clazz.getName()+" , read data index => "+dataIndex);
                        T t = on(clazz).create().set("id",rs.getInt(1)).set("name",rs.getString(2)).get();
                        //resultMap.clear();
//                        for (int i = 1; i <= columns; i++) {
//                            resultMap.put(rs.getMetaData().getColumnLabel(i), rs.getObject(i));
//                        }
                        action.accept(t);
                        return true;
                    } catch (SQLException e) {
                        throw new RuntimeException(e);
                    }
                }
            }, false).onClose(() -> close());
            return resultStream;
        }


        @Override
        public void close() {
            System.out.println("on close !!!!!!!!!!!!!!!!!!!!!!!!!!!");
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                }
                statement = null;
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                }
                connection = null;
            }
        }
    }

}
